// Copyright 2018 The Go Cloud Development Kit Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package drivertest provides a conformance test for implementations of
// driver.
package drivertest // import "gocloud.dev/pubsub/drivertest"

import (
	"bytes"
	"context"
	"errors"
	"sort"
	"strconv"
	"testing"
	"time"

	"github.com/google/go-cmp/cmp"
	"github.com/google/go-cmp/cmp/cmpopts"
	"gocloud.dev/gcerrors"
	"gocloud.dev/internal/escape"
	"gocloud.dev/internal/retry"
	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/batcher"
	"gocloud.dev/pubsub/driver"
	"golang.org/x/sync/errgroup"
)

// Harness descibes the functionality test harnesses must provide to run
// conformance tests.
type Harness interface {
	// CreateTopic creates a new topic and returns a driver.Topic
	// for testing. The topic may have to be removed manually if the test is
	// abruptly terminated or the network connection fails.
	CreateTopic(ctx context.Context, testName string) (dt driver.Topic, cleanup func(), err error)

	// MakeNonexistentTopic makes a driver.Topic referencing a topic that
	// does not exist.
	MakeNonexistentTopic(ctx context.Context) (driver.Topic, error)

	// CreateSubscription creates a new subscription, subscribed
	// to the given topic, and returns a driver.Subscription for testing. The
	// subscription may have to be cleaned up manually if the test is abruptly
	// terminated or the network connection fails.
	CreateSubscription(ctx context.Context, t driver.Topic, testName string) (ds driver.Subscription, cleanup func(), err error)

	// MakeNonexistentSubscription makes a driver.Subscription referencing a
	// subscription that does not exist.
	MakeNonexistentSubscription(ctx context.Context) (ds driver.Subscription, cleanup func(), err error)

	// Close closes resources used by the harness, but does not call Close
	// on the Topics and Subscriptions generated by the Harness.
	Close()

	// MaxBatchSizes returns the maximum size of SendBatch/Send(Na|A)cks, or 0
	// if there's no max.
	MaxBatchSizes() (int, int)

	// SupportsMultipleSubscriptions reports whether the driver supports
	// multiple subscriptions for the same topic.
	SupportsMultipleSubscriptions() bool
}

// HarnessMaker describes functions that construct a harness for running tests.
// It is called exactly once per test; Harness.Close() will be called when the test is complete.
type HarnessMaker func(ctx context.Context, t *testing.T) (Harness, error)

// AsTest represents a test of As functionality.
// The conformance test:
// 1. Calls TopicCheck.
// 2. Calls SubscriptionCheck.
// 3. Sends a message, setting Message.BeforeSend to BeforeSend
//
//	and Message.AfterSend to AfterSend.
//
// 4. Receives the message and calls MessageCheck.
// 5. Calls TopicErrorCheck.
// 6. Calls SubscriptionErrorCheck.
type AsTest interface {
	// Name should return a descriptive name for the test.
	Name() string
	// TopicCheck will be called to allow verifcation of Topic.As.
	TopicCheck(t *pubsub.Topic) error
	// SubscriptionCheck will be called to allow verification of Subscription.As.
	SubscriptionCheck(s *pubsub.Subscription) error
	// TopicErrorCheck will be called to allow verification of Topic.ErrorAs.
	// The error will be the one returned from SendBatch when called with
	// a non-existent topic.
	TopicErrorCheck(t *pubsub.Topic, err error) error
	// SubscriptionErrorCheck will be called to allow verification of
	// Subscription.ErrorAs.
	// The error will be the one returned from ReceiveBatch when called with
	// a non-existent subscription.
	SubscriptionErrorCheck(s *pubsub.Subscription, err error) error
	// MessageCheck will be called to allow verification of Message.As.
	MessageCheck(m *pubsub.Message) error
	// BeforeSend will be used as Message.BeforeSend as part of sending a test
	// message.
	BeforeSend(as func(interface{}) bool) error
	// AfterSend will be used as Message.AfterSend as part of sending a test
	// message.
	AfterSend(as func(interface{}) bool) error
}

// Many tests set the maximum batch size to 1 to make record/replay stable.
var batchSizeOne = &batcher.Options{MaxBatchSize: 1, MaxHandlers: 1}

type verifyAsFailsOnNil struct{}

func (verifyAsFailsOnNil) Name() string {
	return "verify As returns false when passed nil"
}

func (verifyAsFailsOnNil) TopicCheck(t *pubsub.Topic) error {
	if t.As(nil) {
		return errors.New("want Topic.As to return false when passed nil")
	}
	return nil
}

func (verifyAsFailsOnNil) SubscriptionCheck(s *pubsub.Subscription) error {
	if s.As(nil) {
		return errors.New("want Subscription.As to return false when passed nil")
	}
	return nil
}

func (verifyAsFailsOnNil) TopicErrorCheck(t *pubsub.Topic, err error) (ret error) {
	defer func() {
		if recover() == nil {
			ret = errors.New("want Topic.ErrorAs to panic when passed nil")
		}
	}()
	t.ErrorAs(err, nil)
	return nil
}

func (verifyAsFailsOnNil) SubscriptionErrorCheck(s *pubsub.Subscription, err error) (ret error) {
	defer func() {
		if recover() == nil {
			ret = errors.New("want Subscription.ErrorAs to panic when passed nil")
		}
	}()
	s.ErrorAs(err, nil)
	return nil
}

func (verifyAsFailsOnNil) MessageCheck(m *pubsub.Message) error {
	if m.As(nil) {
		return errors.New("want Message.As to return false when passed nil")
	}
	return nil
}

func (verifyAsFailsOnNil) BeforeSend(as func(interface{}) bool) error {
	if as(nil) {
		return errors.New("want Message.BeforeSend's As function to return false when passed nil")
	}
	return nil
}

func (verifyAsFailsOnNil) AfterSend(as func(interface{}) bool) error {
	if as(nil) {
		return errors.New("want Message.AfterSend's As function to return false when passed nil")
	}
	return nil
}

// RunConformanceTests runs conformance tests for driver implementations of pubsub.
func RunConformanceTests(t *testing.T, newHarness HarnessMaker, asTests []AsTest) {
	tests := map[string]func(t *testing.T, newHarness HarnessMaker){
		"TestSendReceive":                          testSendReceive,
		"TestSendReceiveTwo":                       testSendReceiveTwo,
		"TestSendReceiveJSON":                      testSendReceiveJSON,
		"TestNack":                                 testNack,
		"TestBatching":                             testBatching,
		"TestDoubleAck":                            testDoubleAck,
		"TestErrorOnSendToClosedTopic":             testErrorOnSendToClosedTopic,
		"TestErrorOnReceiveFromClosedSubscription": testErrorOnReceiveFromClosedSubscription,
		"TestCancelSendReceive":                    testCancelSendReceive,
		"TestNonExistentTopicSucceedsOnOpenButFailsOnSend":           testNonExistentTopicSucceedsOnOpenButFailsOnSend,
		"TestNonExistentSubscriptionSucceedsOnOpenButFailsOnReceive": testNonExistentSubscriptionSucceedsOnOpenButFailsOnReceive,
		"TestMetadata":           testMetadata,
		"TestNonUTF8MessageBody": testNonUTF8MessageBody,
	}
	for name, test := range tests {
		t.Run(name, func(t *testing.T) { test(t, newHarness) })
	}

	asTests = append(asTests, verifyAsFailsOnNil{})
	t.Run("TestAs", func(t *testing.T) {
		for _, st := range asTests {
			if st.Name() == "" {
				t.Fatalf("AsTest.Name is required")
			}
			t.Run(st.Name(), func(t *testing.T) { testAs(t, newHarness, st) })
		}
	})
}

// RunBenchmarks runs benchmarks for driver implementations of pubsub.
func RunBenchmarks(b *testing.B, topic *pubsub.Topic, sub *pubsub.Subscription) {
	b.Run("BenchmarkReceive", func(b *testing.B) {
		benchmark(b, topic, sub, false)
	})
	b.Run("BenchmarkSend", func(b *testing.B) {
		benchmark(b, topic, sub, true)
	})
}

func testNonExistentTopicSucceedsOnOpenButFailsOnSend(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	dt, err := h.MakeNonexistentTopic(ctx)
	if err != nil {
		// Failure shouldn't happen for non-existent topics until messages are sent
		// to them.
		t.Fatalf("creating a local topic that doesn't exist on the server: %v", err)
	}
	topic := pubsub.NewTopic(dt, nil)
	defer func() {
		if err := topic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	m := &pubsub.Message{}
	err = topic.Send(ctx, m)
	if err == nil || gcerrors.Code(err) != gcerrors.NotFound {
		t.Errorf("got error %v for send to non-existent topic, want code=NotFound", err)
	}
}

func testNonExistentSubscriptionSucceedsOnOpenButFailsOnReceive(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	ds, cleanup, err := h.MakeNonexistentSubscription(ctx)
	if err != nil {
		t.Fatalf("failed to make non-existent subscription: %v", err)
	}
	defer cleanup()
	sub := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
	defer func() {
		if err := sub.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	// The test will hang here if the message isn't available, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	_, err = sub.Receive(ctx2)
	if err == nil || ctx2.Err() != nil || gcerrors.Code(err) != gcerrors.NotFound {
		t.Errorf("got error %v for receive from non-existent subscription, want code=NotFound", err)
	}
}

func testSendReceive(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	want := publishN(ctx, t, topic, 3)
	got := receiveN(ctx, t, sub, len(want))

	// Verify LoggableID is set.
	for _, msg := range got {
		if msg.LoggableID == "" {
			t.Errorf("msg.LoggableID was empty, should be set")
		}
	}

	// Check that the received messages match the sent ones.
	if diff := diffMessageSets(got, want); diff != "" {
		t.Error(diff)
	}
}

// Receive from two subscriptions to the same topic.
// Verify both get all the messages.
func testSendReceiveTwo(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	if !h.SupportsMultipleSubscriptions() {
		t.Skip("multiple subscriptions to a topic not supported")
	}

	dt, cleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()
	topic := pubsub.NewTopic(dt, batchSizeOne)
	defer func() {
		if err := topic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	var ss []*pubsub.Subscription
	for i := 0; i < 2; i++ {
		ds, cleanup, err := h.CreateSubscription(ctx, dt, t.Name())
		if err != nil {
			t.Fatal(err)
		}
		defer cleanup()
		s := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
		defer func() {
			if err := s.Shutdown(ctx); err != nil {
				t.Error(err)
			}
		}()
		ss = append(ss, s)
	}
	want := publishN(ctx, t, topic, 3)
	for i, s := range ss {
		got := receiveN(ctx, t, s, len(want))
		if diff := diffMessageSets(got, want); diff != "" {
			t.Errorf("sub #%d: %s", i, diff)
		}
	}
}

func testSendReceiveJSON(t *testing.T, newHarness HarnessMaker) {
	const json = `{"Foo": "Bar"}`
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	sendM := &pubsub.Message{Body: []byte(json)}
	if err := topic.Send(ctx, sendM); err != nil {
		t.Fatal(err)
	}
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	receiveM, err := sub.Receive(ctx2)
	if err != nil {
		t.Fatal(err)
	}
	receiveM.Ack()
	if diff := diffMessageSets([]*pubsub.Message{receiveM}, []*pubsub.Message{sendM}); diff != "" {
		t.Error(diff)
	}
}

func testNack(t *testing.T, newHarness HarnessMaker) {
	const nMessages = 2

	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	dt, topicCleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer topicCleanup()
	ds, subCleanup, err := h.CreateSubscription(ctx, dt, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer subCleanup()
	if !ds.CanNack() {
		t.Skip("Nack not supported")
	}
	topic := pubsub.NewTopic(dt, batchSizeOne)
	defer func() {
		if err := topic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()
	sub := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
	defer func() {
		if err := sub.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	want := publishN(ctx, t, topic, nMessages)

	// Get the messages, but nack them.
	// Make sure to nack after receiving all of them; otherwise, we could
	// receive one of the messages twice instead of receiving all nMessages.
	// The test will hang here if the messages aren't redelivered, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	var got []*pubsub.Message
	for i := 0; i < nMessages; i++ {
		m, err := sub.Receive(ctx2)
		if err != nil {
			t.Fatal(err)
		}
		got = append(got, m)
	}
	for _, m := range got {
		m.Nack()
	}
	// Check that the received messages match the sent ones.
	if diff := diffMessageSets(got, want); diff != "" {
		t.Error(diff)
	}
	// The test will hang here if the messages aren't redelivered, so use a shorter timeout.
	ctx2, cancel = context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	got = nil
	for i := 0; i < nMessages; i++ {
		m, err := sub.Receive(ctx2)
		if err != nil {
			t.Fatal(err)
		}
		got = append(got, m)
		m.Ack()
	}
	if diff := diffMessageSets(got, want); diff != "" {
		t.Error(diff)
	}
}

func testBatching(t *testing.T, newHarness HarnessMaker) {
	const nMessages = 12 // must be divisible by 2
	const batchSize = nMessages / 2

	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	maxSendBatch, maxAckBatch := h.MaxBatchSizes()

	dt, topicCleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer topicCleanup()
	ds, subCleanup, err := h.CreateSubscription(ctx, dt, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer subCleanup()

	sendBatchOpts := &batcher.Options{MinBatchSize: batchSize, MaxBatchSize: batchSize}
	// If the driver doesn't support batchSize batches, fall back to size 1.
	if maxSendBatch != 0 && batchSize > maxSendBatch {
		sendBatchOpts = batchSizeOne
	}
	topic := pubsub.NewTopic(dt, sendBatchOpts)
	defer func() {
		if err := topic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()
	ackBatchOpts := &batcher.Options{MinBatchSize: batchSize, MaxBatchSize: batchSize}
	// If the driver doesn't support batchSize batches, fall back to size 1.
	if maxAckBatch != 0 && batchSize > maxAckBatch {
		ackBatchOpts = batchSizeOne
	}
	sub := pubsub.NewSubscription(ds, batchSizeOne, ackBatchOpts)
	defer func() {
		if err := sub.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	// Publish nMessages. We have to do them asynchronously because topic.Send
	// blocks until the message is sent, and these messages won't be sent until
	// all batchSize are queued.
	// Note: this test uses the same Body for each message, because the order
	// that they appear in the SendBatch is not stable.
	gr, grctx := errgroup.WithContext(ctx)
	var want []*pubsub.Message
	for i := 0; i < nMessages; i++ {
		m := &pubsub.Message{Body: []byte("hello world")}
		want = append(want, m)
		gr.Go(func() error { return topic.Send(grctx, m) })
	}
	if err := gr.Wait(); err != nil {
		t.Fatal(err)
	}

	// Get the messages.
	// The test will hang here if the messages aren't delivered, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	var got []*pubsub.Message
	for i := 0; i < nMessages; i++ {
		m, err := sub.Receive(ctx2)
		if err != nil {
			t.Fatal(err)
		}
		got = append(got, m)
		m.Ack()
	}
	if diff := diffMessageSets(got, want); diff != "" {
		t.Error(diff)
	}
}

func testDoubleAck(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	dt, topicCleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer topicCleanup()
	ds, subCleanup, err := h.CreateSubscription(ctx, dt, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer subCleanup()

	// Publish 3 messages.
	for i := 0; i < 3; i++ {
		err := dt.SendBatch(ctx, []*driver.Message{{Body: []byte(strconv.Itoa(i))}})
		if err != nil {
			t.Fatal(err)
		}
	}

	// Retrieve the messages.
	// The test will hang here if the messages aren't delivered, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	var dms []*driver.Message
	for len(dms) != 3 {
		curdms, err := ds.ReceiveBatch(ctx2, 3)
		if err != nil {
			t.Fatal(err)
		}
		if err := ctx2.Err(); err != nil {
			t.Fatalf("never received expected messages: %v", err)
		}
		dms = append(dms, curdms...)
	}

	// Ack the first two messages.
	err = ds.SendAcks(ctx, []driver.AckID{dms[0].AckID, dms[1].AckID})
	if err != nil {
		t.Fatal(err)
	}

	// Ack them again; this should succeed even though we've acked them before.
	// If services return an error for this, drivers should drop them.
	err = ds.SendAcks(ctx, []driver.AckID{dms[0].AckID, dms[1].AckID})
	if err != nil {
		t.Fatal(err)
	}

	if !ds.CanNack() {
		return
	}

	// Nack all 3 messages. This should also succeed, and the nack of the third
	// message should take effect, so we should be able to fetch it again.
	// Note that the other messages *may* also be re-sent, because we're nacking
	// them here (even though we acked them earlier); it depends on service
	// semantics and time-sensitivity.
	err = ds.SendNacks(ctx, []driver.AckID{dms[0].AckID, dms[1].AckID, dms[2].AckID})
	if err != nil {
		t.Fatal(err)
	}

	// The test will hang here if the message isn't redelivered, so use a shorter timeout.
	ctx2, cancel = context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	// We're looking to re-receive dms[2].
Loop:
	for {
		curdms, err := ds.ReceiveBatch(ctx2, 1)
		if err != nil {
			t.Fatal(err)
		}
		for _, curdm := range curdms {
			if bytes.Equal(curdm.Body, dms[2].Body) {
				// Found it!
				break Loop
			}
		}
	}
}

// Publish n different messages to the topic. Return the messages.
func publishN(ctx context.Context, t *testing.T, topic *pubsub.Topic, n int) []*pubsub.Message {
	var ms []*pubsub.Message
	for i := 0; i < n; i++ {
		m := &pubsub.Message{
			Body:     []byte(strconv.Itoa(i)),
			Metadata: map[string]string{"a": strconv.Itoa(i)},
		}
		if err := topic.Send(ctx, m); err != nil {
			t.Fatal(err)
		}
		ms = append(ms, m)
	}
	return ms
}

// Receive and ack n messages from sub.
func receiveN(ctx context.Context, t *testing.T, sub *pubsub.Subscription, n int) []*pubsub.Message {
	// The test will hang here if the message(s) aren't available, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	var ms []*pubsub.Message
	for i := 0; i < n; i++ {
		m, err := sub.Receive(ctx2)
		if err != nil {
			t.Fatal(err)
		}
		ms = append(ms, m)
		m.Ack()
	}
	return ms
}

// Find the differences between two sets of messages.
func diffMessageSets(got, want []*pubsub.Message) string {
	for _, m := range got {
		m.LoggableID = ""
	}
	less := func(x, y *pubsub.Message) bool { return bytes.Compare(x.Body, y.Body) < 0 }
	return cmp.Diff(got, want, cmpopts.SortSlices(less), cmpopts.IgnoreUnexported(pubsub.Message{}))
}

func testErrorOnSendToClosedTopic(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	dt, cleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	topic := pubsub.NewTopic(dt, batchSizeOne)
	if err := topic.Shutdown(ctx); err != nil {
		t.Error(err)
	}

	// Check that sending to the closed topic fails.
	m := &pubsub.Message{}
	if err := topic.Send(ctx, m); err == nil {
		t.Error("topic.Send returned nil, want error")
	}
	if err := topic.Shutdown(ctx); err == nil {
		t.Error("wanted error on double Shutdown")
	}
}

func testErrorOnReceiveFromClosedSubscription(t *testing.T, newHarness HarnessMaker) {
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	dt, cleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	ds, cleanup, err := h.CreateSubscription(ctx, dt, t.Name())
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	sub := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
	if err := sub.Shutdown(ctx); err != nil {
		t.Error(err)
	}
	if _, err = sub.Receive(ctx); err == nil {
		t.Error("sub.Receive returned nil, want error")
	}
	if err := sub.Shutdown(ctx); err == nil {
		t.Error("wanted error on double Shutdown")
	}
}

func testCancelSendReceive(t *testing.T, newHarness HarnessMaker) {
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	ctx, cancel := context.WithCancel(ctx)
	cancel()

	m := &pubsub.Message{}
	if err := topic.Send(ctx, m); !isCanceled(err) {
		t.Errorf("topic.Send returned %v (%T), want context.Canceled", err, err)
	}
	if _, err := sub.Receive(ctx); !isCanceled(err) {
		t.Errorf("sub.Receive returned %v (%T), want context.Canceled", err, err)
	}

	// It would be nice to add a test that cancels an in-flight blocking Receive.
	// However, because pubsub.Subscription.Receive repeatedly calls
	// driver.ReceiveBatch if it returns 0 messages, it's difficult to write
	// such a test without it being flaky for drivers with record/replay
	// (the number of times driver.ReceiveBatch is called is timing-dependent).
}

func testMetadata(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	// Some services limit the number of metadata per message.
	// Sort the escape.WeirdStrings values for record/replay consistency,
	// then break the weird strings up into groups of at most maxMetadataKeys.
	const maxMetadataKeys = 10
	var weirdStrings []string
	for _, v := range escape.WeirdStrings {
		weirdStrings = append(weirdStrings, v)
	}
	sort.Slice(weirdStrings, func(i, j int) bool { return weirdStrings[i] < weirdStrings[j] })

	weirdMetaDataGroups := []map[string]string{{}}
	i := 0
	for _, k := range weirdStrings {
		weirdMetaDataGroups[i][k] = k
		if len(weirdMetaDataGroups[i]) == maxMetadataKeys {
			weirdMetaDataGroups = append(weirdMetaDataGroups, map[string]string{})
			i++
		}
	}

	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	for _, wm := range weirdMetaDataGroups {
		m := &pubsub.Message{
			Body:     []byte("hello world"),
			Metadata: wm,
		}
		if err := topic.Send(ctx, m); err != nil {
			t.Fatal(err)
		}

		// The test will hang here if the messages aren't delivered, so use a shorter timeout.
		ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
		defer cancel()
		m, err = sub.Receive(ctx2)
		if err != nil {
			t.Fatal(err)
		}
		m.Ack()

		if diff := cmp.Diff(m.Metadata, wm); diff != "" {
			t.Fatalf("got\n%v\nwant\n%v\ndiff\n%s", m.Metadata, wm, diff)
		}
	}

	// Verify that non-UTF8 strings in metadata key or value fail.
	m := &pubsub.Message{
		Body:     []byte("hello world"),
		Metadata: map[string]string{escape.NonUTF8String: "bar"},
	}
	if err := topic.Send(ctx, m); err == nil {
		t.Error("got nil error, expected error for using non-UTF8 string as metadata key")
	}
	m.Metadata = map[string]string{"foo": escape.NonUTF8String}
	if err := topic.Send(ctx, m); err == nil {
		t.Error("got nil error, expected error for using non-UTF8 string as metadata value")
	}
}

func testNonUTF8MessageBody(t *testing.T, newHarness HarnessMaker) {
	// Set up.
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()

	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	// Sort the WeirdStrings map for record/replay consistency.
	var weirdStrings [][]string // [0] = key, [1] = value
	for k, v := range escape.WeirdStrings {
		weirdStrings = append(weirdStrings, []string{k, v})
	}
	sort.Slice(weirdStrings, func(i, j int) bool { return weirdStrings[i][0] < weirdStrings[j][0] })

	// Construct a message body with the weird strings and some non-UTF-8 bytes.
	var body []byte
	for _, v := range weirdStrings {
		body = append(body, []byte(v[1])...)
	}
	body = append(body, []byte(escape.NonUTF8String)...)
	m := &pubsub.Message{Body: body}

	if err := topic.Send(ctx, m); err != nil {
		t.Fatal(err)
	}
	// The test will hang here if the messages aren't delivered, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	m, err = sub.Receive(ctx2)
	if err != nil {
		t.Fatal(err)
	}
	m.Ack()

	if diff := cmp.Diff(m.Body, body); diff != "" {
		t.Fatalf("got\n%v\nwant\n%v\ndiff\n%s", m.Body, body, diff)
	}
}

func isCanceled(err error) bool {
	if err == context.Canceled {
		return true
	}
	if cerr, ok := err.(*retry.ContextError); ok {
		return cerr.CtxErr == context.Canceled
	}
	return gcerrors.Code(err) == gcerrors.Canceled
}

func makePair(ctx context.Context, t *testing.T, h Harness) (*pubsub.Topic, *pubsub.Subscription, func(), error) {
	dt, topicCleanup, err := h.CreateTopic(ctx, t.Name())
	if err != nil {
		return nil, nil, nil, err
	}
	ds, subCleanup, err := h.CreateSubscription(ctx, dt, t.Name())
	if err != nil {
		topicCleanup()
		return nil, nil, nil, err
	}
	topic := pubsub.NewTopic(dt, batchSizeOne)
	sub := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
	cleanup := func() {
		if err := topic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
		if err := sub.Shutdown(ctx); err != nil {
			t.Error(err)
		}
		subCleanup()
		topicCleanup()
	}
	return topic, sub, cleanup, nil
}

// testAs tests the various As functions, using AsTest.
func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
	ctx := context.Background()
	h, err := newHarness(ctx, t)
	if err != nil {
		t.Fatal(err)
	}
	defer h.Close()
	topic, sub, cleanup, err := makePair(ctx, t, h)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()

	if err := st.TopicCheck(topic); err != nil {
		t.Error(err)
	}

	if err := st.SubscriptionCheck(sub); err != nil {
		t.Error(err)
	}

	msg := &pubsub.Message{
		Body:       []byte("x"),
		BeforeSend: st.BeforeSend,
		AfterSend:  st.AfterSend,
	}
	if err := topic.Send(ctx, msg); err != nil {
		t.Fatal(err)
	}
	// The test will hang here if the messages aren't delivered, so use a shorter timeout.
	ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	m, err := sub.Receive(ctx2)
	if err != nil {
		t.Fatal(err)
	}
	if err := st.MessageCheck(m); err != nil {
		t.Error(err)
	}
	m.Ack()

	// Make a nonexistent topic and try to to send on it, to get an error we can
	// use to call TopicErrorCheck.
	dt, err := h.MakeNonexistentTopic(ctx)
	if err != nil {
		t.Fatal(err)
	}
	nonexistentTopic := pubsub.NewTopic(dt, batchSizeOne)
	defer func() {
		if err := nonexistentTopic.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()
	// The test will hang here if Send doesn't fail quickly, so set a shorter timeout.
	ctx2, cancel = context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	topicErr := nonexistentTopic.Send(ctx2, &pubsub.Message{})
	if topicErr == nil || gcerrors.Code(topicErr) != gcerrors.NotFound {
		t.Errorf("got error %v sending to nonexistent topic, want Code=NotFound", topicErr)
	} else if err := st.TopicErrorCheck(topic, topicErr); err != nil {
		t.Error(err)
	}

	// Make a nonexistent subscription and try to receive from it, to get an error
	// we can use to call SubscriptionErrorCheck.
	ds, cleanup, err := h.MakeNonexistentSubscription(ctx)
	if err != nil {
		t.Fatal(err)
	}
	defer cleanup()
	nonExistentSub := pubsub.NewSubscription(ds, batchSizeOne, batchSizeOne)
	defer func() {
		if err := nonExistentSub.Shutdown(ctx); err != nil {
			t.Error(err)
		}
	}()

	// The test will hang here if Receive doesn't fail quickly, so set a shorter timeout.
	ctx2, cancel = context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	_, subErr := nonExistentSub.Receive(ctx2)
	if subErr == nil || ctx2.Err() != nil || gcerrors.Code(subErr) != gcerrors.NotFound {
		t.Errorf("got error %v receiving from nonexistent subscription, want Code=NotFound", subErr)
	} else if err := st.SubscriptionErrorCheck(nonExistentSub, subErr); err != nil {
		t.Error(err)
	}
}

// Publishes a large number of messages to topic concurrently, and then times
// how long it takes to send (if timeSend is true) or receive (if timeSend
// is false) them all.
func benchmark(b *testing.B, topic *pubsub.Topic, sub *pubsub.Subscription, timeSend bool) {
	attrs := map[string]string{"label": "value"}
	body := []byte("hello, world")
	const (
		nMessages          = 10000
		concurrencySend    = 100
		concurrencyReceive = 100
	)
	if nMessages%concurrencySend != 0 || nMessages%concurrencyReceive != 0 {
		b.Fatal("nMessages must be divisible by # of sending/receiving goroutines")
	}
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		if !timeSend {
			b.StopTimer()
		}
		if err := publishNConcurrently(topic, nMessages, concurrencySend, attrs, body); err != nil {
			b.Fatalf("publishing: %v", err)
		}
		b.Logf("published %d messages", nMessages)
		if timeSend {
			b.StopTimer()
		} else {
			b.StartTimer()
		}
		if err := receiveNConcurrently(sub, nMessages, concurrencyReceive); err != nil {
			b.Fatalf("receiving: %v", err)
		}
		b.SetBytes(nMessages * 1e6)
		b.Log("MB/s is actually number of messages received per second")
		if timeSend {
			b.StartTimer()
		}
	}
}

func publishNConcurrently(topic *pubsub.Topic, nMessages, nGoroutines int, attrs map[string]string, body []byte) error {
	return runConcurrently(nMessages, nGoroutines, func(ctx context.Context) error {
		return topic.Send(ctx, &pubsub.Message{Metadata: attrs, Body: body})
	})
}

func receiveNConcurrently(sub *pubsub.Subscription, nMessages, nGoroutines int) error {
	return runConcurrently(nMessages, nGoroutines, func(ctx context.Context) error {
		m, err := sub.Receive(ctx)
		if err != nil {
			return err
		}
		m.Ack()
		return nil
	})
}

// Call function f n times concurrently, using g goroutines. g must divide n.
// Wait until all calls complete. If any fail, cancel the remaining ones.
func runConcurrently(n, g int, f func(context.Context) error) error {
	gr, ctx := errgroup.WithContext(context.Background())
	ng := n / g
	for i := 0; i < g; i++ {
		gr.Go(func() error {
			for j := 0; j < ng; j++ {
				if err := f(ctx); err != nil {
					return err
				}
			}
			return nil
		})
	}
	return gr.Wait()
}
